package net.courier.mq.provider.gearman;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import net.courier.mq.ExchangeType;
import net.courier.mq.MessageHandleServer;
import net.courier.mq.MessageHandler;
import net.courier.mq.Protocol;

import org.gearman.common.GearmanNIOJobServerConnectionFactory;
import org.gearman.worker.GearmanFunction;
import org.gearman.worker.GearmanFunctionFactory;
import org.gearman.worker.GearmanWorker;
import org.gearman.worker.GearmanWorkerImpl;

public class GearmanMessageHandleServer implements MessageHandleServer {
	private static final int DEFAULT_NUM_THREADS = 5;

	private GearmanWorker gearmanWorker;
	private String hosts;
	private ExecutorService executor;
	private Map<String, HandlerFunctionFactory> functionFactories = new HashMap<String, HandlerFunctionFactory>();
	private long functionTimeout = 0;

	private GearmanAddress[] knownHosts;

	@Override
	public void setHost(String hosts) {
		this.hosts = hosts;
	}

	@Override
	public void setExchangeType(ExchangeType type) {
		throw new UnsupportedOperationException("Gearman的消息机制不支持ExchangeType");
	}

	@Override
	public void setExchangeName(String exchangeName) {
		throw new UnsupportedOperationException("Gearman的消息机制不支持exchangeName");
	}

	@Override
	public synchronized void registerMessageHandler(String key, MessageHandler handler, Protocol<?, byte[]> protocol) {
		if (!functionFactories.containsKey(key)) {
			HandlerFunctionFactory factory = new HandlerFunctionFactory();
			factory.key = key;
			factory.function = new FunctionHandler(handler, key, protocol);
			functionFactories.put(key, factory);

			if (gearmanWorker != null) {
				gearmanWorker.registerFunctionFactory(factory, functionTimeout);
			}
		}
	}

	@Override
	public synchronized void start() {
		initWorker();

		initKnownHosts();

		registerFunctionFactory();

		gearmanWorker.work();
	}

	@Override
	public synchronized void shutdown() {
		if (this.gearmanWorker != null && this.gearmanWorker.isRunning()) {
			this.gearmanWorker.shutdown();
		}
	}

	public void setFunctionTimeout(long functionTimeout) {
		this.functionTimeout = functionTimeout;
	}

	public void setExecutor(ExecutorService executor) {
		this.executor = executor;
	}

	private void initWorker() {
		ExecutorService workerExecutor = (executor == null) ? Executors.newFixedThreadPool(DEFAULT_NUM_THREADS)
				: executor;
		gearmanWorker = new GearmanWorkerImpl(workerExecutor);
	}

	private void registerFunctionFactory() {
		for (GearmanFunctionFactory factory : functionFactories.values()) {
			gearmanWorker.registerFunctionFactory(factory, functionTimeout);
		}

	}

	private void initKnownHosts() {
		GearmanNIOJobServerConnectionFactory connectionFactory = new GearmanNIOJobServerConnectionFactory();
		knownHosts = GearmanAddress.parseAddresses(hosts);
		for (GearmanAddress host : knownHosts) {
			gearmanWorker.addServer(connectionFactory.createConnection(host.getHost(), host.getPort()));
		}

	}

	@Override
	protected void finalize() throws Throwable {
		shutdown();
		super.finalize();
	}

	private static class HandlerFunctionFactory implements GearmanFunctionFactory {
		private String key;
		private GearmanFunction function;

		@Override
		public String getFunctionName() {
			return key;
		}

		@Override
		public GearmanFunction getFunction() {
			return function;
		}

	}

}
